---
title: "How to replicate tables with Sequin"
sidebarTitle: "Replicate tables"
description: "Learn how to build reliable Postgres change data capture pipelines with Sequin. From idempotent processing, to schema transformations, to error handling."
---

This guide shows you how to replicate tables between databases using Sequin for change data capture.

By the end, you'll have a pipeline that streams database changes to your destination database while handling common challenges like race conditions, schema transformations, and recovery from failures.

## When to use this guide

This approach works well when you need to:
- Sync data between systems while applying transformations
- Consolidate data from multiple Postgres tables or databases to a single destination
- Programmatically replicate data between development or staging environments

## Prerequisites

If you're self-hosting Sequin, you'll need:

1. [Sequin installed](/running-sequin)
2. [A database connected](/connect-postgres)
3. A destination database ready to receive updates
4. A sink destination (like SQS, Kafka, Redis, or HTTP)



<Note>
  If using SQS, be sure to use a FIFO queue.
</Note>

## Architecture overview

Your table replication pipeline will have these components:

1. **Source tables**: The table or schema in Postgres that you want to replicate
2. **Destination sink**: The message queue or webhook endpoint that delivers changes to your processing system (e.g. SQS, Kafka, or HTTP endpoint)
3. **Replication processor**: An application or service you write that receives changes and upserts to your destination table

## Create a sink

First, create a sink to the queue, stream, or webhook endpoint that you want to use to process changes:

<Steps>
  <Step title="Select the source">
    Select the table or schema you want to replicate.

    Optionally add SQL filters to sync a subset of your source table(s) to the destination.
  </Step>

  <Step title="Select the message type">
    Leave the default "Changes" message type selected.
  </Step>

  <Step title="Leave message grouping default">
    If your sink supports message grouping, leave the default option selected for "Message grouping".

    This will ensure that messages are [grouped by primary key](/reference/sinks/overview#grouping-and-ordering), helping eliminate race conditions as you write rows to your destination.
  </Step>

  <Step title="Specify backfill">
    To replicate data currently in your source table, specify `backfill`.

    <Note>
      Backfill [messages](/reference/messages) have an `action` of `read`.
    </Note>
  </Step>

  <Step title="Configure sink-specific settings">
    Configure sink-specific settings and click "Create Sink".
  </Step>
</Steps>

## Process changes

Once your sink is configured, changes from your source table will flow to your message queue or HTTP endpoint. Before implementing your change processor, consider these key requirements for reliable table replication:

### Important considerations

1. **Type casting**: JSON messages transport data types differently than Postgres, often with less specificity:
   - UUIDs arrive as strings
   - Timestamps/dates arrive as ISO-8601 strings
   - Decimals/numerics arrive as numbers

   You will need to cast types appropriately in your processor.

2. **Batch processing**: For better performance, batch your database operations:
   - Consider your message queue's batching capabilities (e.g., SQS batch size)
   - Group similar operations (inserts/updates vs deletes) and perform them in a single database operation

3. **Idempotency**: Always use `ON CONFLICT` clauses to handle duplicate messages safely. Most likely, your destination table will have the same primary key(s) as your source table. Make sure to include the primary key(s) in your `ON CONFLICT` clause. That will ensure that writes to the destination table are idempotent.

### Example: Basic 1-to-1 replication

First, create a destination table that matches your source schema:

```sql create_table.sql
create table replicated_products (
  product_id uuid primary key,  -- matches source table PK
  name text,
  price decimal,
  created_at timestamp
);
```

<Note>
  As you'll see in the next section, you want to ensure your destination table has a primary key or unique constraint that matches your source table to enable idempotent upserts.
</Note>

### Process changes

Here's an example of how to process changes to your destination table:

```python process_change.py
import json
import uuid

def process_change(change):
    product_id = uuid.UUID(change.record['id'])

    if change.action in ['insert', 'update', 'read']:
        # Map source table to appropriate column
        record = {
            'product_id': product_id,
            'data': json.dumps(change.record)
        }

        db.execute(f"""
            insert into replicated_products (product_id, data)
            values (%(product_id)s, %(data)s)
            on conflict (product_id) do update set
                data = excluded.data
        """, record)

    elif change.action == 'delete':
        db.execute(f"""
            delete from replicated_products
            where product_id = %(product_id)s
        """, {'product_id': product_id})
```

<Note>
  For better performance, consider batching multiple changes into a single database operation. Batching increases throughput while still maintaining transactional guarantees.
</Note>

With this approach, you can efficiently replicate changes from your source table to your destination table in ~25 lines of code.

### Example: Fan-in replication

When combining multiple source tables into a single destination table, you'll need to carefully consider your primary key strategy. Here are two approaches:

#### Option 1: Composite key with source identifier

This approach works well when all source tables share the same primary key type (e.g., UUID):

```sql
create table combined_records (
  source_table text,
  record_id uuid,
  data jsonb,
  primary key (source_table, record_id)
);
```

Here's how to process changes with this schema:

```python
def process_change(change):
    source_table = change.metadata.table_name
    record_id = uuid.UUID(change.record['id'])

    if change.action in ['insert', 'update', 'read']:
        # Map source table to appropriate column
        record = {
            'source_table': source_table,
            'record_id': record_id,
            'data': json.dumps(change.record)
        }

        db.execute("""
            insert into combined_records (source_table, record_id, data)
            values (%(source_table)s, %(record_id)s, %(data)s)
            on conflict (source_table, record_id) do update set
                data = excluded.data
        """, record)

    elif change.action == 'delete':
        db.execute("""
            delete from combined_records
            where source_table = %(source_table)s
            and record_id = %(record_id)s
        """, {'source_table': source_table, 'record_id': record_id})
```

#### Option 2: Separate primary key columns

For more complex scenarios where source tables have different primary key types or you need to maintain separate unique constraints:

```sql
create table combined_records (
  id serial primary key,
  users_id uuid,
  orders_id uuid,
  products_id uuid,
  data jsonb,
  constraint exactly_one_pk check (
    (users_id is not null)::integer +
    (orders_id is not null)::integer +
    (products_id is not null)::integer = 1
  ),
  constraint users_unique unique (users_id),
  constraint orders_unique unique (orders_id),
  constraint products_unique unique (products_id)
);
```

Here's how to process changes with this schema:

```python
def process_change(change):
    source_table = change.metadata.table_name
    record_id = uuid.UUID(change.record['id'])

    if change.action in ['insert', 'update', 'read']:
        # Map source table to appropriate column
        record = {
            f"{source_table}_id": record_id,
            'data': json.dumps(change.record)
        }

        db.execute(f"""
            insert into combined_records ({source_table}_id, data)
            values (%(id)s, %(data)s)
            on conflict ({source_table}_id) do update set
                data = excluded.data
        """, record)

    elif change.action == 'delete':
        db.execute(f"""
            delete from combined_records
            where {source_table}_id = %(id)s
        """, {'id': record_id})
```

## Verify your pipeline is working

If you specified a backfill, there should be messages in your stream ready for your system to process:

1. On the sink overview page, click the "Messages" tab. You should see messages flowing to your sink.
2. Check your destination table to ensure rows are being replicated as expected.

## Maintenance

### Re-syncing your destination

You may need to re-sync your destination in these scenarios:

1. **Schema changes**: Updates to source or destination schema
2. **Logic updates**: Changes to transformation logic
3. **Data recovery**: Recovering from processing errors

You can use a [backfill](/reference/backfills) to replay data from your source table through your pipeline.

## Next steps

See "[Deploy to production](/how-to/deploy-to-production)" for guidance on copying your local sink configuration to your production environment.